package com.kaxiu.config.mqtt;

import com.alibaba.fastjson.JSONObject;
import com.kaxiu.config.redis.RedisService;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.stream.CharacterStreamReadingMessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.handler.annotation.Header;

import java.util.Random;
import java.util.UUID;

/**
 * @author: LiYang
 * @create: 2019-08-10 12:12
 * @Description: 只能发
 **/
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {

    @Autowired
    private MqttProperties mqttProperties;

    @Autowired
    private RedisService redisService;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttProperties.getUrl() });
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }


//    接收消息
    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlows.from(
                new MqttPahoMessageDrivenChannelAdapter(
                        mqttProperties.getConsumerClientId(),
                        mqttClientFactory(),
                        mqttProperties.getDefaultTopic(),
                        mqttProperties.getLocationTopic()
                ))
                .handle(m -> {
                    if(mqttProperties.getDefaultTopic().equals(m.getHeaders().get("mqtt_receivedTopic"))){
                        System.out.println(m.getPayload());
                    }else if(mqttProperties.getLocationTopic().equals(m.getHeaders().get("mqtt_receivedTopic"))){
                        JSONObject location = JSONObject.parseObject(m.getPayload().toString());
                        String userId = location.getString("userId");
                        redisService.setLocation(Long.valueOf(userId), location);
                        System.out.println(m.getPayload());
                    }
                })
                .get();
    }


    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler(mqttProperties.getProducerClientId(), mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }


//    发送消息
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {
        void sendToMqtt(String data);
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    }

}
